package pbadger

import (
	"log"
	"runtime"
	"sync"

	badger "github.com/dgraph-io/badger/v2"
	ants "github.com/panjf2000/ants/v2"
)

type PBadgerReader struct {
	Badgers map[string]*badger.DB
}

type readArgs struct {
	Dir    string
	Prefix string
	DB     *badger.DB
}

func NewPBadgerReader(dirs []string) (*PBadgerReader, error) {
	var pb = PBadgerReader{}

	pb.Badgers = make(map[string]*badger.DB)
	for _, dir := range dirs {
		db, err := badger.Open(badger.DefaultOptions(dir))
		if err != nil {
			pb.Close()
			return nil, err
		}
		pb.Badgers[dir] = db
	}
	return &pb, nil
}

func (pb *PBadgerReader) Close() {
	if pb.Badgers == nil {
		return
	}
	for _, v := range pb.Badgers {
		if v != nil {
			v.Close()
		}
	}

	pb.Badgers = nil
}

func (pb *PBadgerReader) Read(key string, dataCh chan []byte) {
	defer close(dataCh)

	var wg sync.WaitGroup

	f := func(payload interface{}) {
		readOneBadger(payload.(*readArgs), dataCh)
		wg.Done()
	}

	p, err := ants.NewPoolWithFunc(runtime.NumCPU(), f)
	if err != nil {
		log.Fatalf("Start Gouroutine Pool error: %s\n", err)
	}
	defer p.Release()

	for dir, db := range pb.Badgers {
		wg.Add(1)
		if err = p.Invoke(&readArgs{
			Dir:    dir,
			DB:     db,
			Prefix: key,
		}); err != nil {
			log.Fatalf("Invoke readOneBadger error: %s\n", err)
		}
	}

	wg.Wait()
}

func readOneBadger(args *readArgs, dataCh chan []byte) error {

	return args.DB.View(func(txn *badger.Txn) error {
		opts := badger.DefaultIteratorOptions
		it := txn.NewIterator(opts)
		defer it.Close()

		for it.Seek([]byte(args.Prefix)); it.ValidForPrefix([]byte(args.Prefix)); it.Next() {
			// for it.Rewind(); it.Valid(); it.Next() {
			item := it.Item()
			k := item.Key()
			err := item.Value(func(v []byte) error {
				// skip only prefix
				if string(k) == args.Prefix {
					return nil
				}

				nb := make([]byte, len(v))
				copy(nb, v)
				dataCh <- nb
				return nil

			})
			if err != nil {
				log.Fatalf("[%s] At KEY: %s, readOneBadger error: %s\n", args.Dir, k, err)
				// return err
			}
		}
		return nil
	})
}
